package com.hzmg.netty;

import com.hzmg.netty.model.TestModel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;

import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;

import java.nio.charset.Charset;
import java.util.List;

/**
 * 根据传输数据类型进行不同处理demo示例
 */
public class MultiHandleServerDemo {
    private static final MyFirstServerHandler MY_FIRST_SERVER_HANDLER = new MyFirstServerHandler();

    public static void main(String[] args) throws Exception {
        //创建两个线程组 boosGroup,workerGroup Nio：非阻塞
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务端启动器对象 ServerBootstrap:服务端启动器工厂类
            ServerBootstrap bootstrap = new ServerBootstrap();
            //将上面创建的两个线程组添加进服务端启动对象
            bootstrap.group(bossGroup, workerGroup)
                    //设置服务端通道实现类型，这里使用socket传输通道
                    .channel(NioServerSocketChannel.class)
                    //设置线程队列能得到的最大连接个数，该参数起限流作用，
                    //线程队列指第三次握手时的acept_queue队列
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //设置保持活动连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //使用匿名内部类的形式初始化通道对象
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //给pipeline管道设置处理器,这个管道就是用来接收io传来的信息，并在里面进行处理
                            //socketChannel.pipeline().addLast(myFirstServerHandler);
                            try {
                                socketChannel.pipeline()
                                        .addLast(new ObjectDecoder(s->TestModel.class))
                                        .addLast(new ServerHandlerForObject())
                                        .addLast(new StringDecoder())
                                        .addLast(new ServerHandlerForString());
                            }catch (Exception ignore){

                            }

                        }
                    });//这里是给workerGroup的EventLoop对应的管道设置处理器
            System.out.println("[MultiHandleServerDemo]:服务端准备就绪");
            //绑定端口号，启动服务端
            ChannelFuture channelFuture = bootstrap.bind(8888).sync();
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭队列组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }


}
/**
 * 处理String类型的处理器
 * 自定义的Handler需要继承Netty规定好的HandlerAdapter
 * 才能被Netty框架所关联，类似SpringMVC的适配器模式
 */
class ServerHandlerForString extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送过来的消息
        if(msg instanceof String){
            System.out.println("[服务端]：收到客户端" + ctx.channel().remoteAddress() + "发送的消息: " + (String) msg);
        }else {
            //传输到下一个处理器
            System.out.println("[服务端]：非String ，转发给下一个处理器");
            ctx.fireChannelRead(msg);
        }

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //解析完收到的消息后要回传给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,正在处理或转发", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //异常捕获，以及对应处理，这里的处理为关闭通道
        System.out.println("[服务端]:发现异常,异常为: "+cause.toString());

        ctx.close();
    }
}
/**
 * 处理String类型的处理器
 * 自定义的Handler需要继承Netty规定好的HandlerAdapter
 * 才能被Netty框架所关联，类似SpringMVC的适配器模式
 */
class ServerHandlerForObject extends ChannelInboundHandlerAdapter {
    Object msg;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送过来的消息
        if(msg instanceof TestModel){
            System.out.println("[服务端]：收到客户端" + ctx.channel().remoteAddress() + "发送的消息: " + msg.toString());
        }else {
            //传输到下一个处理器
            System.out.println("[服务端]：非object ，转发给下一个处理器");
            ctx.fireChannelRead(msg);
        }

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //解析完收到的消息后要回传给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,正在处理或转发", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //异常捕获，以及对应处理，这里的处理为关闭通道
        System.out.println("[服务端]:发现异常,异常为: "+cause.toString());
        ctx.fireChannelActive();
    }
}

class ObjectDecoder1 extends ByteToMessageDecoder {
    // 消息头：发送端写的是一个int，占用4字节。
    private final static int HEAD_LENGTH = 4;
    private final static int STRING_LENGTH=1048576;
    private final ObjectDecoder objectDecoder=new ObjectDecoder(s->TestModel.class);
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        Charset charset = Charset.defaultCharset();
        //
        if (in.readableBytes() < HEAD_LENGTH) {
            return;
        }
        String msg=in.toString(charset);
        byte[] bu=msg.getBytes ();
        if(bu.length<STRING_LENGTH){
            // 标记一下当前的readIndex的位置
            in.markReaderIndex();

            // 读取数据长度
            int dataLength = in.readInt();
            // 我们读到的消息体长度为0，这是不应该出现的情况，这里出现这情况，关闭连接。
            if (dataLength < 0) {
                ctx.close();
            }

            //读到的消息体长度如果小于我们传送过来的消息长度，则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
                return;
            }

            // 将缓冲区的数据读到字节数组
            byte[] body = new byte[dataLength];
            in.readBytes(body);
            //将byte数据转化为我们需要的对象。
            Object msg1 = convertToObj(body);
            out.add(msg1);
        }
        return;

    }

    private Object convertToObj(byte[] body) {
        return new String(body,0,body.length);
    }
}

